package com.zhangpan.test;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @author zhangpan
 */
public class StreamWordCount {

    public static void main(String[] args) throws Exception {

        // 1. 创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. 读取文本流
        DataStreamSource<String> lineStream = env.socketTextStream("hdp134",7777);

        //3. 转换 分组  求和  得到统计结果
        SingleOutputStreamOperator<Tuple2<String,Long>>  sum = lineStream.flatMap((String line, Collector <Tuple2<String,Long>> out) -> {
            String[] words = line.split(" ");
            for (String word:words) {
                out.collect(Tuple2.of(word,1L));
            }
        }).returns(Types.TUPLE(Types.STRING,Types.LONG))
                .keyBy(data -> data.f0)
                .sum(1);

        // 打印
        sum.print();

        //4. 执行
        env.execute();
    }
}
